package org.apache.spark.sql.execution.streaming
trait Sink {
def addBatch(batchId: Long, data: DataFrame): Unit
}
Streaming Sinks
A Streaming Sink represents an external storage to write streaming datasets to. It is modeled as Sink trait that can process batches of data given as DataFrames.
The following sinks are currently available in Spark:
-
ConsoleSink for
consoleformat. -
FileStreamSink for
parquetformat. -
ForeachSink used in foreach operator.
-
MemorySink for
memoryformat.
You can create your own streaming format implementing StreamSinkProvider.
Sink Contract
Sink Contract is described by Sink trait. It defines the one and only addBatch method to add data as batchId.
FileStreamSink
FileStreamSink is the streaming sink for the parquet format.
|
Caution
|
FIXME |
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime}
val out = in.writeStream
.format("parquet")
.option("path", "parquet-output-dir")
.option("checkpointLocation", "checkpoint-dir")
.trigger(ProcessingTime(5.seconds))
.outputMode(OutputMode.Append)
.start()
FileStreamSink supports Append output mode only.
It uses spark.sql.streaming.fileSink.log.deletion (as isDeletingExpiredLog)
MemorySink
MemorySink is an memory-based Sink particularly useful for testing. It stores the results in memory.
It is available as memory format that requires a query name (by queryName method or queryName option).
|
Tip
|
See the example in MemoryStream. |
|
Note
|
It was introduced in the pull request for [SPARK-14288][SQL] Memory Sink for streaming. |
Use toDebugString to see the batches.
Its aim is to allow users to test streaming applications in the Spark shell or other local tests.
You can set checkpointLocation using option method or it will be set to spark.sql.streaming.checkpointLocation setting.
If spark.sql.streaming.checkpointLocation is set, the code uses $location/$queryName directory.
Finally, when no spark.sql.streaming.checkpointLocation is set, a temporary directory memory.stream under java.io.tmpdir is used with offsets subdirectory inside.
|
Note
|
The directory is cleaned up at shutdown using ShutdownHookManager.registerShutdownDeleteDir.
|
val nums = spark.range(10).withColumnRenamed("id", "num")
scala> val outStream = nums.writeStream
.format("memory")
.queryName("memStream")
.start()
16/04/11 19:37:05 INFO HiveSqlParser: Parsing command: memStream
outStream: org.apache.spark.sql.StreamingQuery = Continuous Query - memStream [state = ACTIVE]
It creates MemorySink instance based on the schema of the DataFrame it operates on.
It creates a new DataFrame using MemoryPlan with MemorySink instance created earlier and registers it as a temporary table (using DataFrame.registerTempTable method).
|
Note
|
At this point you can query the table as if it were a regular non-streaming table using sql method. |
A new StreamingQuery is started (using StreamingQueryManager.startQuery) and returned.
|
Caution
|
FIXME Describe else part.
|